Skip to content

[runtime] Filter resumed currentProcessingKeys by subtask ownership a…#581

Open
joeyutong wants to merge 1 commit intoapache:mainfrom
joeyutong:main
Open

[runtime] Filter resumed currentProcessingKeys by subtask ownership a…#581
joeyutong wants to merge 1 commit intoapache:mainfrom
joeyutong:main

Conversation

@joeyutong
Copy link

…fter rescaling

Purpose of change

ActionExecutionOperator persists currentProcessingKeysOpState as union operator state so all subtasks can see in-flight keys after restore/rescaling. However, tryResumeProcessActionTasks() resumed every restored key without checking whether the key belonged to the current subtask.

That behavior is incorrect after scale-out: multiple subtasks can receive the same union state entry, so non-owner subtasks may also enqueue resume work for keys they do not own. This can lead to duplicated processing after recovery.

This change fixes the restore path by filtering currentProcessingKeysOpState with subtask ownership before resubmitting work:

  • compute the key group for each restored key
  • compute the owning subtask for the current maxParallelism and parallelism
  • only resume keys owned by the current subtask

The ownership check is encapsulated in isKeyOwnedByCurrentSubtask(...) in ActionExecutionOperator.java.

Tests

Added an operator-level restore/rescaling regression test in ActionExecutionOperatorTest.java:

  • snapshot state from parallelism=1
  • repartition it to parallelism=2
  • restore both subtasks
  • verify only the owner subtask gets a resume mail, while the non-owner subtask does not

Documentation

  • doc-needed
  • doc-not-needed
  • doc-included

@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs fixVersion/0.3.0 The feature or bug should be implemented/fixed in the 0.3.0 version. priority/major Default priority of the PR or issue. labels Mar 20, 2026
@wenjin272
Copy link
Collaborator

Thanks for you work @joeyutong, LGTM. Could you take a look at your convenience?

In pr #80, it was discussed that keys outside the key range need to be filtered out after rescaling. However, it appears that it was not implemented at the time.

I suggest replacing currentProcessingKeysOpState to UnionList. This change would ensure that all keys are retained even after a parallelism modification or restart. After that, we can simply iterate through the list and clean up any keys that fall outside the current key range.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs fixVersion/0.3.0 The feature or bug should be implemented/fixed in the 0.3.0 version. priority/major Default priority of the PR or issue.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants